package com.xiaofan.apitest.state

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.{FsStateBackend, FsStateBackendFactory}
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.util
import java.util.concurrent.TimeUnit

/**
 * 状态编程
 */
object StateTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
//    env.setStateBackend(new MemoryStateBackend())
//    env.setStateBackend(new FsStateBackend(""))
//    env.setStateBackend(new RocksDBStateBackend(""))

    env.enableCheckpointing(500)
//    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
//    env.getCheckpointConfig.setCheckpointTimeout(60000L)

//    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)

//    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)
//    env.getCheckpointConfig.setPreferCheckpointForRecovery(true)
//    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)


    // 检查点配置
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointTimeout(60000L)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L)
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)

    // 重启策略
    // 1. 固定时间间隔重启，10s中内重启3次
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
    // 2. 固定时间间隔内重启多少次，每次间隔多长(ಥ_ಥ)
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))


    val inputStream: DataStream[String] = env.socketTextStream("19  2.168.1.27", 9999)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    )

    // 需求：对于传感器温度值跳变，超过“10”度进行报警
    val warningStream: DataStream[(String, Double, Double)] = dataStream
      .keyBy(_.id)
      // 方式1 .flatMap(new TimeChangeAlert(10.0))
      // 方式2  (R, S) 有状态的函数(只能在keyBy之后使用) fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
      .flatMapWithState[(String, Double, Double), Double] {

        case (data: SensorReading, None) => (List.empty, Some(data.temperature))

        case (data: SensorReading, lastTemp: Some[Double]) => {

          val diff = (data.temperature - lastTemp.get).abs

          if (diff > 10.0) {
            (List((data.id, lastTemp.get, data.temperature)), Some(data.temperature))
          } else {
            (List.empty, Some(data.temperature))
          }

        }
      }

    warningStream.print()


    env.execute("state test")
  }
}


/**
 * 自定义RichFlatMapFunction
 */
class TimeChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {
  // 定义状态，保存上一次的温度值
  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double]))
  // 定义开关
  lazy val firstState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isFirst", classOf[Boolean]))

  override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {

    val lastTemp: Double = lastTempState.value()

    val noFirst: Boolean = firstState.value()

    if (noFirst) {
      // 差值比较
      val diff = (value.temperature - lastTemp).abs

      if (diff > threshold) {
        out.collect((value.id, lastTemp, value.temperature))
      }

    } else {
      firstState.update(true)
    }

    lastTempState.update(value.temperature)

  }
}


/**
 * keyed state 必须定义在RichFunction中，因为需要获取上下文
 */
class MyRichMapper extends RichMapFunction[SensorReading, String] {
  // 方式1
  var valueState: ValueState[Double] = _
  // 方式2
  lazy val listState: ListState[Int] = getRuntimeContext.getListState(new ListStateDescriptor[Int]("liststate", classOf[Int]))
  lazy val mapState: MapState[String, Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String, Double]("mapstate", classOf[String], classOf[Double]))

  lazy val reducingState: ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("reducingstate", new MyReduceFunction, classOf[SensorReading]))


  override def open(parameters: Configuration): Unit = {
    valueState = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valuestate", classOf[Double]))
  }

  override def map(value: SensorReading): String = {
    // 读写操作
    // val temperature: Double = valueState.value()
    // valueState.update(value.temperature)

    listState.add(1)
    val list = new util.ArrayList[Int]()
    list.add(2)
    list.add(3)
    listState.addAll(list)
    listState.update(list)
    listState.get()

    mapState.contains("sensor_1")
    mapState.put("sensor_1", 1.3)
    mapState.get("sensor_1")

    ""
  }
}

/**
 * 模拟自定义聚合函数
 */
class MyReduceFunction extends ReduceFunction[SensorReading] {
  override def reduce(curState: SensorReading, newData: SensorReading): SensorReading = SensorReading(curState.id, newData.timestamp, curState.temperature.min(newData.temperature))
}
